/*
 * @Author: Wangjun
 * @Date: 2021-07-08 14:25:38
 * @LastEditTime: 2025-05-25 21:51:14
 * @LastEditors: wangjun haodreams@163.com
 * @Description:写入redis
 * @FilePath: \xr_historyd:\go\src\gitee.com\haodreams\golib\easyredis\redis.go
 * hnxr
 */

package easyredis

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"net"
	"strings"
	"time"

	"gitee.com/haodreams/golib/logs"
	"gitee.com/haodreams/libs/config"
	"gitee.com/haodreams/libs/ee"
	"gitee.com/haodreams/libs/proxy"
	"github.com/redis/go-redis/v9"
)

const maxNum = 5000

var ErrRedisInvalid = errors.New("redis is not available")

type RedisSetter interface {
	GetID() string
}

/**
 * @description: 写入redis数据
 * @param {*}
 * @return {*}
 */
type Redis struct {
	enable     bool //是否开启redis写入功能
	option     redis.Options
	masterName string
	client     *redis.Client
	ctx        context.Context
	proxy      func(ctx context.Context, network, addr string) (net.Conn, error)
}

type Option func(r *Redis)

/**
 * @description: 新建一个redis
 * @param {string} prefix
 * @return {*}
 */
func NewSimpleRedis(prefix string) (r *Redis, err error) {
	r = new(Redis)
	err = r.Setup(prefix)
	return
}

func WithPassword(pass string) Option {
	return func(r *Redis) {
		r.option.Password = pass
	}
}

func WithDB(db int) Option {
	return func(r *Redis) {
		r.option.DB = db
	}
}

func WithProxy(proxyAddr string) Option {
	return func(r *Redis) {
		if proxyAddr != "" {
			var err error
			r.proxy, err = proxy.ContextDialFunc(proxyAddr)
			if err != nil {
				return
			}
			r.option.Dialer = r.Dial
		}
	}
}

func WithMasterName(masterName string) Option {
	return func(r *Redis) {
		r.masterName = masterName
	}
}

func WithReadTimeout(timeout time.Duration) Option {
	return func(r *Redis) {
		r.option.ReadTimeout = timeout
	}
}

func WithWriteTimeout(timeout time.Duration) Option {
	return func(r *Redis) {
		r.option.WriteTimeout = timeout
	}
}

func NewRedis(host string, opts ...Option) (r *Redis, err error) {
	r = new(Redis)
	r.option.ReadTimeout = 15 * time.Second
	r.option.WriteTimeout = 15 * time.Second

	r.option.Addr = host
	for _, o := range opts {
		o(r)
	}
	if host == "" {
		err = ee.NewError("禁用REDIS服务").Print(logs.CbInfo)
		return
	}
	r.ctx = context.Background()
	err = r.setup()
	return
}

/**
 * @description: web客户端的初始化
 * @param {string} prefix key的前缀
 * @return {*}
 */
func (m *Redis) Setup(prefix string) (err error) {
	m.ctx = context.Background()
	m.option.Addr = config.String(prefix + "redis_host")
	if m.option.Addr == "" {
		err = ee.NewError("无效的redis地址,关闭redis写入功能").Print(logs.CbError)
		m.enable = false
		return
	}
	m.option.Password = config.String(prefix + "redis_password")
	m.option.DB = config.DefaultInt(prefix+"redis_default_db", 6)
	proxyAddr := config.String("proxy")
	if proxyAddr != "" {
		m.proxy, err = proxy.ContextDialFunc(proxyAddr)
		if err != nil {
			return
		}
		m.option.Dialer = m.Dial
	}
	m.masterName = config.DefaultString(prefix+"redis_master_name", "redis-sentinel")
	return m.setup()
}

func (m *Redis) setup() (err error) {
	addrs := strings.Split(m.option.Addr, ",")
	if len(addrs) > 1 { //连接哨兵redis(集团)
		m.client = redis.NewFailoverClient(&redis.FailoverOptions{
			MasterName:    m.masterName,
			SentinelAddrs: addrs,
			Password:      m.option.Password,
			DB:            m.option.DB,
			ReadTimeout:   60 * time.Second,
			WriteTimeout:  60 * time.Second,
		})
		result, err := m.client.Ping(m.ctx).Result()
		if err != nil {
			logs.Error("ping err :", err)
			return ee.Print(err, logs.CbError)
		}
		fmt.Println(result)

	} else { //连接单机redis(区域)
		m.client = redis.NewClient(&m.option)
		if m.client == nil {
			err = ee.NewError("无效的redis客户端")
			return ee.Print(err, logs.CbError)
		}
	}

	m.enable = true
	return
}

func (m *Redis)Enable() bool{
	return m.enable
}

func (m *Redis) Ping() (err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	result, err := m.client.Ping(m.ctx).Result()
	if err != nil {
		logs.Error("ping err :", err)
		return ee.Print(err, logs.CbError)
	}
	fmt.Println(result)
	return
}

func (m *Redis) Close() (err error) {
	if m.enable {
		m.enable = false
		client := m.client
		if client != nil {
			err = client.Close()
			m.client = nil
		}
	}
	return
}

func (m *Redis) Dial(ctx context.Context, network, addr string) (net.Conn, error) {
	if !m.enable {
		return nil, ErrRedisInvalid
	}
	if m.proxy != nil {
		return m.proxy(ctx, network, addr)
	}
	netDialer := &net.Dialer{
		Timeout:   5 * time.Second,
		KeepAlive: 5 * time.Minute,
	}
	return netDialer.DialContext(ctx, network, addr)
}

// KV []string
func (m *Redis) hmset(key string, kvs []string) (msg string, err error) {
	now := time.Now()
	res := m.client.HMSet(m.ctx, key, kvs)
	if res.Err() != nil {
		logs.Error(res.Err())
		err = res.Err()
		return
	}
	msg = fmt.Sprint("Write to redis used time:", time.Since(now), "insert number:", len(kvs)/2)
	return
}

func (m *Redis) HGET(key, field string) (result string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	res := m.client.HGet(m.ctx, key, field)
	if res.Err() != nil {
		err = res.Err()
		return
	}

	return res.Result()
}

// HVALS
func (m *Redis) HVALS(key string) (result []string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	res := m.client.HVals(m.ctx, key)
	if res.Err() != nil {
		err = res.Err()
		return
	}

	return res.Result()
}

func (m *Redis) Set(key, field, value string) (msg string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	return m.hmset(key, []string{field, value})
}

func (m *Redis) Get(key, field string) (result string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	return m.HGET(key, field)
}

func (m *Redis) Delete(key string) (err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	cmd := m.client.Del(m.ctx, key)
	if cmd.Err() != nil {
		return
	}
	return
}

func (m *Redis) DeleteField(key, field string) (err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	cmd := m.client.HDel(m.ctx, key, field)
	if cmd.Err() != nil {
		return
	}
	return
}

func (m *Redis) HMSet(key string, list []RedisSetter) (msg string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	total := 0
	now := time.Now()
	var value string
	kvs := make([]string, maxNum)
	i := 0
	for _, stateDetail := range list {
		if stateDetail == nil {
			continue
		}
		kvs[i] = stateDetail.GetID()
		i++
		data, err := json.Marshal(stateDetail)
		if err != nil {
			value = "{}"
		} else {
			value = string(data)
		}
		kvs[i] = value
		i++
		if i >= maxNum {
			m.hmset(key, kvs[:i])
			i = 0
		}
		total++

		if i >= maxNum {
			msg, err = m.hmset(key, kvs[:i])
			if err != nil {
				return msg, err
			}
			i = 0
		}
		total++
	}

	if i > 0 {
		msg, err = m.hmset(key, kvs[:i])
		if err != nil {
			return msg, err
		}
	}
	msg = fmt.Sprint("Insert to redis number:", total, " used time:", time.Since(now))
	return
}

// 添加消息到消息队列
func (m *Redis) XADD(topic string, data []string, sizes ...int64) (msg string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	size := int64(1000)
	if len(sizes) > 0 {
		size = sizes[0]
	}
	return m.client.XAdd(m.ctx, &redis.XAddArgs{
		Stream: topic,
		Values: data,
		MaxLen: int64(size),
		Approx: true,
	}).Result()
}

// group 消费组名
// topic 消息队列名
func (m *Redis) XGroupCreate(group, topic string, ids ...string) (msg string, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}

	//检查消费者组是否存在
	groups, err := m.client.XInfoGroups(m.ctx, topic).Result()
	if err != nil {
		return
	}
	found := false
	for i := range groups {
		if groups[i].Name == group {
			found = true
			break
		}
	}
	if found {
		return "", nil
	}
	id := "$"
	if len(ids) > 0 {
		id = ids[0]
	}

	return m.client.XGroupCreate(m.ctx, topic, group, id).Result()
}

// group 消费组名
// topic 消息队列名
// consumer 消费者名 组内每个消费者独立接收消息，Redis 会确保同一条消息仅被组内一个消费者处理(group 中的消息只能被一个消费者消费)
func (m *Redis) XReadGroup(group, consumer, topic string) (msgs []redis.XStream, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}

	cmds := m.client.XReadGroup(m.ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{topic, ">"},
		Count:    100,
		Block:    15 * time.Second,
	})
	if cmds.Err() != nil {
		if cmds.Err() == redis.Nil {
			return nil, nil
		}
		return nil, cmds.Err()
	}

	m.client.XGroupSetID(m.ctx, topic, group, ">")

	return cmds.Result()
}

func (m *Redis) XAutoClaim(stream string, group string, consumer string, minIdle int64, start string, count int64) *redis.XAutoClaimCmd {
	if !m.enable {
		return nil
	}
	// 	type XAutoClaimArgs struct {
	// 	Stream   string
	// 	Group    string
	// 	MinIdle  time.Duration
	// 	Start    string
	// 	Count    int64
	// 	Consumer string
	// }
	return m.client.XAutoClaim(m.ctx, &redis.XAutoClaimArgs{
		Stream:   stream,
		Group:    group,
		Consumer: consumer,
		MinIdle:  time.Duration(minIdle) * time.Second,
		Start:    "0-0",
		Count:    count,
	})
}

func (m *Redis) XAck(topic, group string, ids ...string) (v int64, err error) {
	if !m.enable {
		err = ErrRedisInvalid
		return
	}
	return m.client.XAck(m.ctx, topic, group, ids...).Result()
}

// func (m *Redis) XPending(topic, group, consumer, start, end string, count int64) (*redis.XPending, error) {
// 	if !m.enable {
// 		return nil,nil
// 	}

// // 	type XPendingExtArgs struct {
// // 	Stream   string
// // 	Group    string
// // 	Idle     time.Duration
// // 	Start    string
// // 	End      string
// // 	Count    int64
// // 	Consumer string
// // }
// 	 m.client.XPendingExt(m.ctx, topic, &redis.XPendingExtArgs{
// 		Stream: topic,

// 	 })
// }
